package com.demo.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 隔2条数据对统计最近4条数据，进行累加之和
 * .
 * 每来2条数据，统计最近4条数据之和
 */
public class SlidingCountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Long> inputStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Long>() {
                    @Override
                    public Long map(String val) throws Exception {
                        return Long.valueOf(val);
                    }
                });
        inputStream.countWindowAll(4, 2).sum(0).print("每来2条数据，统计最近4条数据之和");
        env.execute("每来2条数据，统计最近4条数据之和");
    }
}
